[HUDI-4445] S3 Incremental source improvements#6176
[HUDI-4445] S3 Incremental source improvements#6176vamshigv wants to merge 7 commits intoapache:masterfrom
Conversation
305c9e0 to
e434c6b
Compare
| .rdd().toJavaRDD().mapPartitions(fileListIterator -> { | ||
| List<String> cloudFilesPerPartition = new ArrayList<>(); | ||
| fileListIterator.forEachRemaining(row -> { | ||
| final Configuration configuration = serializableConfiguration.newCopy(); |
There was a problem hiding this comment.
Why creating a copy again? I don't see any config modification happening within the executor. Why not pass serializableConfiguration simply?
| Dataset ds = addPartitionColumn(dataFrameReader.load(cloudFiles.toArray(new String[0])),cloudFiles); | ||
| dataset = Option.of(ds); | ||
| } | ||
| LOG.warn("Extracted distinct files " + cloudFiles.size() |
There was a problem hiding this comment.
i assume it was for testing, change log to debug level?
| static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options"; | ||
|
|
||
| // ToDo make it a list of extensions | ||
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
There was a problem hiding this comment.
Is this a list of supported source data files extensions, e.g. .json, .parquet, .avro, etc?
There was a problem hiding this comment.
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | |
| static final String S3INCR_FILE_EXTENSIONS_OPTIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
should align with the actual key, and suffix OPTIONS since it is a key not the extensions
| // ToDo make it a list of extensions | ||
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
|
||
| static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; |
There was a problem hiding this comment.
| static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; | |
| // Add a comment on the purpose of this config and rename as below | |
| static final String ADD_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.add.source.partition.column"; |
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
|
||
| static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; | ||
| static final Boolean DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN = true; |
There was a problem hiding this comment.
Have we fully tested this change? If not, I would suggest keeping the default false for now.
| private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) { | ||
| if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN) | ||
| && !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) { | ||
| String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0]; |
There was a problem hiding this comment.
return early or log error/warn if partitionKey is null or empty?
| List<String> nestedPartition = Arrays.stream(filePath.split("/")) | ||
| .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList()); | ||
| if (nestedPartition.size() > 1) { | ||
| throw new HoodieException("More than one level of partitioning exists"); |
There was a problem hiding this comment.
Is it planned to be supported sometime in future? If yes, let's create a tracking JIRA for that.
There was a problem hiding this comment.
multiple level partition is very common. so this is a major limitation? if push this out, how would it affect existing users?
| return ds; | ||
| } | ||
|
|
||
| private Column s3EventsColumnFilter(String fileFormat) { |
There was a problem hiding this comment.
A minor suggestion to extract such kind of methods to a separate util class and keep this class plain and simple. Or if you prefer to keep these methods in this class for better readability then move it to the bottom (i.e. after the call site) for linear flow.
xushiyan
left a comment
There was a problem hiding this comment.
quick skimmed the code. have some comments
| static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options"; | ||
|
|
||
| // ToDo make it a list of extensions | ||
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
There was a problem hiding this comment.
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | |
| static final String S3INCR_FILE_EXTENSIONS_OPTIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
should align with the actual key, and suffix OPTIONS since it is a key not the extensions
| // ToDo make it a list of extensions | ||
| static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
|
||
| static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; |
| List<String> nestedPartition = Arrays.stream(filePath.split("/")) | ||
| .filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList()); | ||
| if (nestedPartition.size() > 1) { | ||
| throw new HoodieException("More than one level of partitioning exists"); |
There was a problem hiding this comment.
multiple level partition is very common. so this is a major limitation? if push this out, how would it affect existing users?
| if (checkExists) { | ||
| FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration); | ||
| try { | ||
| if (fs.exists(new Path(decodeUrl))) { |
There was a problem hiding this comment.
creating hadoop Path gives much more memory overhead than normal instantiation. If just for checking, let's find a better way.
| .filter(filterColumn) | ||
| .select("s3.bucket.name", "s3.object.key") | ||
| .distinct() | ||
| .rdd().toJavaRDD().mapPartitions(fileListIterator -> { |
There was a problem hiding this comment.
why convert to RDD? you should be able to do mapPartitions with Dataset too
|
Priority is to land #6228 ahead of this while this can make it to the next release. |
…-S3-Incremental-Source
| return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA); | ||
| } | ||
|
|
||
| public RawTripTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException { |
There was a problem hiding this comment.
RawTripTestPayload assumes some form of trips schema. If you look at its constructor, we don't use the schema. And its APIs assume a few things about the schema. Should we keep all this out of HoodieTestDataGenerator?
There was a problem hiding this comment.
This is an old comment. Please check if it's still valid.
| import org.apache.avro.generic.GenericData; | ||
| import org.apache.avro.generic.GenericRecord; | ||
|
|
||
| // Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html) |
| .requiredString("eventSource") | ||
| .requiredString("eventName") | ||
| .name("s3") |
There was a problem hiding this comment.
Let's extract all these strings to constants.
| dos.write(jsonData.getBytes()); | ||
| } finally { | ||
| dos.flush(); | ||
| dos.close(); |
There was a problem hiding this comment.
Will this close the ByteArrayOutputStream too?
| /** | ||
| * Generic class for specific payload implementations to inherit from. | ||
| */ | ||
| public abstract class GenericTestPayload { |
There was a problem hiding this comment.
Maybe rename to AbstractJsonTestPayload? It's essentially for json data right?
| import org.apache.avro.generic.GenericData; | ||
| import org.apache.avro.generic.GenericRecord; | ||
|
|
||
| // Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html) |
| .requiredString("eventSource") | ||
| .requiredString("eventName") | ||
| .requiredString("_row_key") | ||
| .name("s3") |
There was a problem hiding this comment.
Preferably extract all these as static string constants.
| @Override | ||
| public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { | ||
| Pair<Option<Dataset<Row>>, String> sourceMetadata = fetchMetadata(lastCkptStr, sourceLimit); | ||
| if (!sourceMetadata.getKey().isPresent()) { |
| List<String> cloudFilesPerPartition = new ArrayList<>(); | ||
| final Configuration configuration = serializableConfiguration.newCopy(); | ||
| fileListIterator.forEachRemaining(row -> { | ||
| // TODO: configuration is updated in the getFs call. check if new copy is needed w.r.t to getFs. |
| } | ||
|
|
||
| @Test | ||
| public void testHoodieIncrSource() throws IOException { |
There was a problem hiding this comment.
Maybe rename to testS3EventsHoodieIncrSource?
| this.dataSize = jsonData.length(); | ||
| Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class); | ||
| this.rowKey = jsonRecordMap.get("_row_key").toString(); | ||
| this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); |
There was a problem hiding this comment.
i recall this logic has been refactored in the current RawTripTestPayload
| .mapPartitions((MapPartitionsFunction<Row, String>) fileListIterator -> { | ||
| .rdd() | ||
| // JavaRDD simplifies coding with collect and suitable mapPartitions signature. check if this can be avoided. | ||
| .toJavaRDD() | ||
| .mapPartitions(fileListIterator -> { |
There was a problem hiding this comment.
we usually prefer high level dataframe apis. how is it actually beneficial to convert to rdd here? don't quite get the comment
| } | ||
| } No newline at end of file |
| /** | ||
| * Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). | ||
| */ | ||
| public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload<S3EventTestPayload> { |
There was a problem hiding this comment.
I'd suggest just test with DefaultHoodieRecordPayload with a specific S3 event schema, instead of creating a new test payload, as we want to test as close as the real scenario. Besides, we don't couple payload with schema, as payload is just responsible for how to merge
There was a problem hiding this comment.
there is a lot of existing misused with the RawTripTestPayload see https://issues.apache.org/jira/browse/HUDI-6164
so you may want to decouple the improvement changes from payload changes.
xushiyan
left a comment
There was a problem hiding this comment.
For whatever improvements done for S3 incr source, we should make the same for GCS incr source?
|
Closing this as not needed. |
What is the purpose of the pull request
S3 Incremental source improvements:
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.